{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "execution": {
     "iopub.execute_input": "2023-09-15T15:43:08.277487Z",
     "iopub.status.busy": "2023-09-15T15:43:08.277204Z",
     "iopub.status.idle": "2023-09-15T15:43:10.519650Z",
     "shell.execute_reply": "2023-09-15T15:43:10.518367Z"
    }
   },
   "outputs": [],
   "source": [
    "# this script produces fuzzy matches at the city, province and national level\n",
    "# in files df_city.parquet, df_province.parquet and df_national.parquet\n",
    "\n",
    "import time\n",
    "import os\n",
    "import sys\n",
    "import pickle\n",
    "import polars as pl\n",
    "import numpy as np\n",
    "from concurrent.futures import ProcessPoolExecutor\n",
    "from collections import defaultdict\n",
    "from functools import partial\n",
    "from sklearn.metrics.pairwise import cosine_similarity\n",
    "from sklearn.feature_extraction.text import TfidfVectorizer\n",
    "\n",
    "sys.path.append('../')\n",
    "import tools\n",
    "\n",
    "# CHANGE\n",
    "PATH_OUTPUT = '' # curated data\n",
    "PATH_DATA = '' # analysis data\n",
    "\n",
    "pl.Config.set_fmt_str_lengths(100);"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "execution": {
     "iopub.execute_input": "2023-09-15T15:43:10.524315Z",
     "iopub.status.busy": "2023-09-15T15:43:10.523786Z",
     "iopub.status.idle": "2023-09-15T15:43:19.309634Z",
     "shell.execute_reply": "2023-09-15T15:43:19.308686Z"
    }
   },
   "outputs": [],
   "source": [
    "# DATA PREPARATION\n",
    "\n",
    "start_time = time.time()\n",
    "\n",
    "def clean_company_names(df, mapper):\n",
    "    clean_series = (\n",
    "        df.select(['company_name'])\n",
    "        .join(mapper, on='company_name', how='left')\n",
    "        .to_series(1)\n",
    "    )\n",
    "    return (\n",
    "        df\n",
    "        .with_columns(clean_series)\n",
    "        .drop('company_name')\n",
    "        .unique(subset=['city', 'province', 'cleaned_name'])\n",
    "    )\n",
    "\n",
    "\n",
    "with open(PATH_DATA + 'mapping_cities.pickle', 'rb') as handle:\n",
    "    mapping_cities = pickle.load(handle)\n",
    "\n",
    "df_indeed = (\n",
    "    pl.read_parquet(\n",
    "        PATH_OUTPUT + 'indeed_all_jobs.parquet',\n",
    "        columns=['company_name', 'city', 'province']\n",
    "        )\n",
    "    .filter(pl.col('company_name').is_not_null())\n",
    "    .unique()\n",
    ")\n",
    "\n",
    "df_advan = (\n",
    "    pl.read_parquet(\n",
    "        PATH_OUTPUT + 'advan_companies.parquet',\n",
    "        columns=['company_name', 'city', 'province', 'naics']\n",
    "        )\n",
    "    .filter(pl.col('company_name').is_not_null())\n",
    "    .unique()\n",
    "    )\n",
    "\n",
    "\n",
    "# Data Preprocessing\n",
    "\n",
    "# collect all company names from Indeed and Advan\n",
    "unique_names = (\n",
    "    pl.concat([df_indeed, df_advan], how='diagonal')\n",
    "    ['company_name']\n",
    "    .unique()\n",
    "    .to_list()\n",
    ")\n",
    "\n",
    "\n",
    "# create mapper to map company names to cleaned names\n",
    "preprocess_mapper = pl.DataFrame(\n",
    "    {\n",
    "        \"company_name\": unique_names,\n",
    "        \"cleaned_name\": [\n",
    "            tools.replace_stopwords(name).lower().replace(\" \", \"\")\n",
    "            for name in unique_names\n",
    "        ]\n",
    "    }\n",
    ")\n",
    "preprocess_mapper.write_parquet(PATH_OUTPUT + 'preprocess_mapper.parquet')\n",
    "\n",
    "# Prepare data for matching\n",
    "\n",
    "df_indeed = clean_company_names(df_indeed, preprocess_mapper)\n",
    "df_advan = clean_company_names(df_advan, preprocess_mapper)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "execution": {
     "iopub.execute_input": "2023-09-15T15:43:19.313568Z",
     "iopub.status.busy": "2023-09-15T15:43:19.313239Z",
     "iopub.status.idle": "2023-09-15T15:43:35.685656Z",
     "shell.execute_reply": "2023-09-15T15:43:35.685022Z"
    }
   },
   "outputs": [],
   "source": [
    "# FIT VECTORIZER\n",
    "\n",
    "NGRAMS = 3\n",
    "\n",
    "def ngrams(string):\n",
    "    ngrams = zip(*[string[i:] for i in range(NGRAMS)])\n",
    "    return [''.join(ngram) for ngram in ngrams]\n",
    "\n",
    "vectorizer = TfidfVectorizer(analyzer=ngrams)\n",
    "vectorizer.fit(preprocess_mapper['cleaned_name'].to_list())\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "execution": {
     "iopub.execute_input": "2023-09-15T15:43:35.729974Z",
     "iopub.status.busy": "2023-09-15T15:43:35.729607Z",
     "iopub.status.idle": "2023-09-15T15:43:35.737851Z",
     "shell.execute_reply": "2023-09-15T15:43:35.737233Z"
    }
   },
   "outputs": [],
   "source": [
    "# MATCHING FUNCTIONS\n",
    "\n",
    "CHUNKS = 2000\n",
    "\n",
    "def split_list(lst, chunksize=20_000):\n",
    "    \"\"\"returns a list of lists of size chunksize\"\"\"\n",
    "    return [lst[i:i + chunksize] for i in range(0, len(lst), chunksize)]\n",
    "\n",
    "\n",
    "def find_matches(names_source, tfidfs_target):\n",
    "    tfidfs_source = vectorizer.transform(names_source)  # important for speed that names_source is list and not a series\n",
    "    mat_similarity = cosine_similarity(tfidfs_source, tfidfs_target)\n",
    "    indices = [i for i in mat_similarity.argmax(axis=1)]\n",
    "    scores = mat_similarity.max(axis=1)\n",
    "    return (indices, scores)\n",
    "\n",
    "\n",
    "def match_names(names_source, names_target, chunksize=2000):\n",
    "    \"\"\"Processes the data in chunks if too large (too avoid memory issues\"\"\"\n",
    "    tfidfs_target = vectorizer.transform(names_target)\n",
    "    len_source = len(names_source)\n",
    "    if len_source > chunksize:\n",
    "        chunks = np.array_split(names_source, int(len_source / chunksize) + 1)\n",
    "        res = [find_matches(chunk, tfidfs_target) for chunk in chunks]\n",
    "\n",
    "        indices, scores = [], []\n",
    "        for res_indices, res_scores in res:\n",
    "            indices.extend(res_indices)\n",
    "            scores.extend(res_scores)\n",
    "    else:\n",
    "        indices, scores = find_matches(names_source, tfidfs_target)\n",
    "\n",
    "    matches = [names_target[index] for index in indices]\n",
    "    return matches, scores"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "execution": {
     "iopub.execute_input": "2023-09-15T15:43:35.740978Z",
     "iopub.status.busy": "2023-09-15T15:43:35.740683Z",
     "iopub.status.idle": "2023-09-15T15:44:41.225858Z",
     "shell.execute_reply": "2023-09-15T15:44:41.224625Z"
    }
   },
   "outputs": [],
   "source": [
    "# MATCHING CITY LEVEL\n",
    "\n",
    "def prepare_lists_names(df):\n",
    "    return (\n",
    "        df\n",
    "        .groupby(['city', 'province'])\n",
    "        .agg(pl.col('cleaned_name'))\n",
    "    )\n",
    "\n",
    "def filter_names(df, location):\n",
    "    \"\"\"Returns list of company names in a given location\"\"\"\n",
    "    try:\n",
    "        l_company_names = (\n",
    "            df.filter(\n",
    "                (pl.col('city') == location[0]) & (pl.col('province') == location[1])\n",
    "                )\n",
    "            ['cleaned_name']\n",
    "            .to_list()[0]\n",
    "            )\n",
    "    except IndexError:  # no company in this location\n",
    "        l_company_names = []\n",
    "    return l_company_names\n",
    "\n",
    "def find_matches_city(location, names_source, names_target):\n",
    "    matches, scores = match_names(names_source, names_target)\n",
    "    d = {\n",
    "        'cleaned_name': names_source,\n",
    "        'match': matches,\n",
    "        'score': scores,\n",
    "        'city': [location[0]] * len(names_source),\n",
    "        'province': [location[1]] * len(names_source)\n",
    "        }\n",
    "    return d\n",
    "\n",
    "\n",
    "df_sources = prepare_lists_names(df_indeed)\n",
    "df_targets = prepare_lists_names(df_advan)\n",
    "\n",
    "locations, sources, targets = [], [], []\n",
    "for location in mapping_cities:\n",
    "    source = filter_names(df_sources, location)\n",
    "    target = []\n",
    "    for location_target in mapping_cities[location]:\n",
    "        target.extend(filter_names(df_targets, location_target))\n",
    "\n",
    "    locations.append(location)\n",
    "    sources.append(source)\n",
    "    targets.append(target)\n",
    "\n",
    "with ProcessPoolExecutor(max_workers=10) as executor:\n",
    "    ds = executor.map(find_matches_city, locations, sources, targets)\n",
    "\n",
    "\n",
    "df_city = pl.concat([pl.DataFrame(d) for d in ds], how='vertical')\n",
    "\n",
    "# Compute naics codes\n",
    "d_mapping_cities = defaultdict(list)\n",
    "\n",
    "for source, l_targets in list(mapping_cities.items()):\n",
    "    for target in l_targets:\n",
    "        d_mapping_cities['city'].append(source[0])\n",
    "        d_mapping_cities['province'].append(source[1])\n",
    "        d_mapping_cities['city_target'].append(target[0])\n",
    "        d_mapping_cities['province_target'].append(target[1])\n",
    "\n",
    "df_mapping_cities = pl.DataFrame(d_mapping_cities)\n",
    "df_mapping_cities.write_parquet(PATH_OUTPUT + 'df_mapping_cities.parquet')\n",
    "\n",
    "df_naics = (\n",
    "    # get naics codes for each (company, city, province) in df_advan\n",
    "    df_advan\n",
    "    .groupby(['city', 'province', 'cleaned_name'])  # groupby target location\n",
    "    .agg(pl.col('naics'))\n",
    "    .rename({'city': 'city_target', 'province': 'province_target'})\n",
    "    # map this to city, province in df_indeed\n",
    "    .join(\n",
    "        df_mapping_cities,\n",
    "        on=['city_target', 'province_target'],\n",
    "        how='left'\n",
    "        )\n",
    "    .explode('naics')\n",
    "    # get naics codes for each (company, city, province) in df_indeed\n",
    "    .groupby(['city', 'province', 'cleaned_name'])  # groupby source location\n",
    "    .agg(pl.col('naics'))\n",
    ")\n",
    "\n",
    "# add_naics_codes\n",
    "df_city = df_city.join(\n",
    "    df_naics, left_on=['city', 'province', 'match'],\n",
    "    right_on=['city', 'province', 'cleaned_name'],\n",
    "    how='left'\n",
    "    )\n",
    "\n",
    "df_city.write_parquet(PATH_OUTPUT + \"df_city.parquet\")\n",
    "\n",
    "print(f'Matching at city level: time elapsed={time.time() - start_time:.2f} s')\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "execution": {
     "iopub.execute_input": "2023-09-15T15:44:41.229933Z",
     "iopub.status.busy": "2023-09-15T15:44:41.229611Z",
     "iopub.status.idle": "2023-09-15T15:47:59.138240Z",
     "shell.execute_reply": "2023-09-15T15:47:59.136812Z"
    }
   },
   "outputs": [],
   "source": [
    "# PROVINCIAL LEVEL\n",
    "\n",
    "# match companies at provincial level\n",
    "\n",
    "def names_by_province(df):\n",
    "    return (\n",
    "        df\n",
    "        .groupby('province')\n",
    "        .agg(pl.col('cleaned_name').unique())\n",
    "        .sort('province')\n",
    "        )\n",
    "\n",
    "\n",
    "def find_matches_province(province, names_source, names_target):\n",
    "    matches, scores = match_names(names_source, names_target)\n",
    "    d = {\n",
    "        'cleaned_name': names_source,\n",
    "        'match': matches,\n",
    "        'score': scores,\n",
    "        'province': [province] * len(names_source)}\n",
    "    return d\n",
    "\n",
    "\n",
    "df_names = (\n",
    "    names_by_province(df_indeed)\n",
    "    .join(names_by_province(df_advan), on='province', how='left', suffix=\"_target\")\n",
    "    .drop_nulls()\n",
    "    .with_columns(pl.col('cleaned_name').apply(split_list))\n",
    "    .explode('cleaned_name')\n",
    ")\n",
    "\n",
    "with ProcessPoolExecutor(max_workers=10) as executor:\n",
    "    ds = executor.map(\n",
    "        find_matches_province,\n",
    "        df_names['province'].to_list(),\n",
    "        df_names['cleaned_name'].to_list(),\n",
    "        df_names['cleaned_name_target'].to_list()\n",
    "        )\n",
    "\n",
    "df_province = pl.concat([pl.DataFrame(d) for d in ds], how='vertical')\n",
    "\n",
    "# add naics codes\n",
    "\n",
    "df_naics = (\n",
    "    df_advan\n",
    "    .groupby(['province', 'cleaned_name'])\n",
    "    .agg(pl.col('naics'))\n",
    ")\n",
    "\n",
    "df_province = df_province.join(\n",
    "    df_naics,\n",
    "    left_on=['province', 'match'],\n",
    "    right_on=['province', 'cleaned_name'],\n",
    "    how='left'\n",
    ")\n",
    "\n",
    "df_province.write_parquet(PATH_OUTPUT + \"df_province.parquet\")\n",
    "print(f'Matching at provincial level: time elapsed={time.time() - start_time:.2f} s')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "execution": {
     "iopub.execute_input": "2023-09-15T15:47:59.142497Z",
     "iopub.status.busy": "2023-09-15T15:47:59.142139Z",
     "iopub.status.idle": "2023-09-15T16:05:04.224968Z",
     "shell.execute_reply": "2023-09-15T16:05:04.223336Z"
    }
   },
   "outputs": [],
   "source": [
    "# MATCHING NATIONAL LEVEL\n",
    "\n",
    "names_target = df_advan['cleaned_name'].unique().to_list()\n",
    "\n",
    "\n",
    "def find_matches_national_helper(names_source, names_target):\n",
    "    matches, scores = match_names(names_source, names_target)\n",
    "    d = {\n",
    "        'cleaned_name': names_source,\n",
    "        'match': matches,\n",
    "        'score': scores\n",
    "        }\n",
    "    return d\n",
    "\n",
    "\n",
    "find_matches_national = partial(\n",
    "    find_matches_national_helper, names_target=names_target)\n",
    "\n",
    "\n",
    "df_names = (\n",
    "    pl.DataFrame({\"cleaned_name\": [df_indeed['cleaned_name'].unique().to_list()]})\n",
    "    .with_columns(pl.col('cleaned_name').apply(split_list))\n",
    "    .explode('cleaned_name')\n",
    ")\n",
    "\n",
    "with ProcessPoolExecutor(max_workers=8) as executor:\n",
    "    ds = executor.map(find_matches_national, df_names['cleaned_name'].to_list())\n",
    "\n",
    "\n",
    "df_national = pl.concat([pl.DataFrame(d) for d in ds], how='vertical')\n",
    "\n",
    "# add naics codes\n",
    "df_naics = (\n",
    "    df_advan\n",
    "    .groupby('cleaned_name')\n",
    "    .agg(pl.col('naics'))\n",
    ")\n",
    "\n",
    "df_national = df_national.join(\n",
    "    df_naics,\n",
    "    left_on=['match'],\n",
    "    right_on=['cleaned_name'],\n",
    "    how='left'\n",
    ")\n",
    "\n",
    "df_national.write_parquet(PATH_OUTPUT + \"df_national.parquet\")\n",
    "\n",
    "print(f'Matching at national level: time elapsed={time.time() - start_time:.2f} s')\n"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3.10.12 ('env_indeed2')",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.12"
  },
  "vscode": {
   "interpreter": {
    "hash": "7120590dfa35e6512fb14e5e70b67446c3e78c7a5c027e908dbb14d6a3f8a0eb"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
